/*
 * Copyright @ 2017 - Present, 8x8 Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jitsi.videobridge

import org.jitsi.utils.logging2.Logger
import org.jitsi.utils.queue.CountingErrorHandler
import org.jitsi.utils.queue.PacketQueue
import org.jitsi.videobridge.message.BridgeChannelMessage
import org.jitsi.videobridge.message.BridgeChannelMessage.Companion.parse
import org.jitsi.videobridge.message.MessageHandler
import org.jitsi.videobridge.metrics.QueueMetrics
import org.jitsi.videobridge.metrics.VideobridgeMetricsContainer
import org.jitsi.videobridge.util.TaskPools
import org.json.simple.JSONObject
import java.io.IOException
import java.time.Clock

abstract class AbstractEndpointMessageTransport(parentLogger: Logger) : MessageHandler() {

    protected val logger: Logger = parentLogger.createChildLogger(javaClass.name)

    abstract val isConnected: Boolean

    private val incomingMessageQueue: PacketQueue<MessageAndSource> = PacketQueue<MessageAndSource>(
        50,
        true,
        INCOMING_MESSAGE_QUEUE_ID,
        { messageAndSource ->
            try {
                handleMessage(messageAndSource.message)?.let { response ->
                    sendMessage(messageAndSource.source, response)
                }
            } catch (e: Exception) {
                logger.warn("Failed to handle message: ", e)
            }
            true
        },
        TaskPools.IO_POOL,
        Clock.systemUTC()
    ).apply { setErrorHandler(queueErrorCounter) }

    /**
     * Fires the message transport ready event for the associated endpoint.
     */
    protected open fun notifyTransportChannelConnected() {}

    /**
     * Notifies this [EndpointMessageTransport] that a specific message
     * has been received on a specific transport channel.
     * @param src the transport channel on which the message has been received.
     * @param msg the message that was received.
     */
    fun onMessage(src: Any?, msg: String) {
        val message = try {
            parse(msg)
        } catch (ioe: IOException) {
            logger.warn("Invalid message received (${ioe.message}: $msg")
            return
        }

        logger.debug { "RECV: $msg" }
        incomingMessageQueue.add(MessageAndSource(message, src))
    }

    /**
     * Sends [msg] over the active transport channel of this [EndpointMessageTransport].
     */
    protected open fun sendMessage(msg: BridgeChannelMessage) {}
    protected open fun sendMessage(dst: Any?, message: BridgeChannelMessage) =
        logger.debug { "SENT: " + message.toJson() }

    open fun close() {}

    open val debugState: JSONObject
        get() = JSONObject().apply {
            this["received_counts"] = JSONObject(getReceivedCounts())
        }

    /**
     * Events generated by [AbstractEndpointMessageTransport] types which are of interest to other entities.
     */
    interface EndpointMessageTransportEventHandler {
        fun endpointMessageTransportConnected(endpoint: AbstractEndpoint)
    }

    private data class MessageAndSource(val message: BridgeChannelMessage, val source: Any?)

    companion object {
        const val INCOMING_MESSAGE_QUEUE_ID = "bridge-channel-message-incoming-queue"
        private val droppedPacketsMetric = VideobridgeMetricsContainer.instance.registerCounter(
            "endpoint_receive_message_queue_dropped_packets",
            "Number of packets dropped out of the Endpoint receive message queue."
        )
        private val exceptionsMetric = VideobridgeMetricsContainer.instance.registerCounter(
            "endpoint_receive_message_queue_exceptions",
            "Number of exceptions from the Endpoint receive message queue."
        )
        val queueErrorCounter = object : CountingErrorHandler() {
            override fun packetDropped() = super.packetDropped().also {
                droppedPacketsMetric.inc()
                QueueMetrics.droppedPackets.inc()
            }
            override fun packetHandlingFailed(t: Throwable?) = super.packetHandlingFailed(t).also {
                exceptionsMetric.inc()
                QueueMetrics.exceptions.inc()
            }
        }
    }
}
